/*
 * Copyright (c) Members of the EGEE Collaboration. 2004.
 * See http://eu-egee.org/partners/ for details on the copyright holders.
 * For license conditions see the license file or http://eu-egee.org/license.html
 */

package org.glite.rgma.server.services.producer.store;

import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.naming.ConfigurationException;

import org.glite.rgma.server.services.ServerConfig;
import org.glite.rgma.server.services.ServerConstants;
import org.glite.rgma.server.services.database.HSQLDBConnection;
import org.glite.rgma.server.services.sql.ColumnDefinition;
import org.glite.rgma.server.services.sql.CreateIndexStatement;
import org.glite.rgma.server.services.sql.CreateTableStatement;
import org.glite.rgma.server.services.sql.InsertStatement;
import org.glite.rgma.server.services.sql.OrderBy;
import org.glite.rgma.server.services.sql.SelectItem;
import org.glite.rgma.server.services.sql.SelectStatement;
import org.glite.rgma.server.services.sql.TableReference;
import org.glite.rgma.server.services.sql.UpdateStatement;
import org.glite.rgma.server.system.NumericException;
import org.glite.rgma.server.system.RGMAPermanentException;
import org.glite.rgma.server.system.ResourceEndpoint;
import org.glite.rgma.server.system.TupleSet;
import org.glite.rgma.server.system.TupleSetWithLastTUID;

/**
 * HSQLDB implementation of DatabaseInstance.
 */

public class HSQLDBTupleStoreDatabase extends TupleStoreDatabaseBase implements TupleStoreDatabase {

	private class HSQLDBOneTimeCursor {

		private boolean m_active = true;

		/** Index of next tuple to return. */
		private int m_currentIndex;

		/** Cursor ID. */
		private int m_id;

		/** Number of tuples in the temporary table. */
		private int m_numTuples;

		/** SELECT query. */
		private SelectStatement m_selectStatement;

		/** Temporary table name. */
		private String m_tableName;;

		/**
		 * Creates a new Cursor.
		 * 
		 * @param id
		 *            Cursor ID.
		 * @param connection
		 *            Connection to MySQL DB.
		 * @param selectStatement
		 *            SELECT query.
		 * @throws RGMAPermanentException
		 * @throws RGMAPermanentException
		 */
		private HSQLDBOneTimeCursor(int id, SelectStatement selectStatement) throws RGMAPermanentException, NumericException {
			m_id = id;
			m_selectStatement = selectStatement;
			m_currentIndex = 0;
			m_tableName = CURSOR_TABLE_PREFIX + m_id;
			try {
				m_numTuples = m_connection.executeUpdate(getCreateStatement());
				m_connection.executeQuery("ALTER TABLE " + m_tableName + " ADD COLUMN " + ReservedColumns.RGMA_TUID_ONE_OFF_COLUMN_NAME
						+ " INTEGER GENERATED BY DEFAULT AS IDENTITY");
			} catch (SQLException e) {
				if (e.getErrorCode() == -45) { /* label required */
					throw new NumericException(id, e.getMessage());
				}
				throw new RGMAPermanentException(e);
			}
		}

		/**
		 * Closes this cursor and removes the temporary table.
		 */
		synchronized private void close() {
			if (m_active) {
				try {
					m_connection.executeUpdate("DROP TABLE " + m_tableName);
					m_active = false;
				} catch (SQLException e) {
					// Do nothing.
				}
				if (LOG.isDebugEnabled()) {
					LOG.debug("Cursor " + m_tableName + " closed.");
				}
			} else {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Cursor " + m_tableName + " already closed.");
				}
			}
		}

		/**
		 * Fetches at most <code>maxRows</code> rows from this cursor.
		 * 
		 * @param maxRows
		 *            Maximum number of rows to return.
		 * @return At most <code>maxRows</code> rows from this cursor.
		 * @throws SQLException
		 * @throws RGMAPermanentException
		 */
		private TupleSetWithLastTUID fetch(int maxRows) throws SQLException, RGMAPermanentException {
			int tuplesLeft = m_numTuples - m_currentIndex;
			boolean endOfResults = maxRows >= tuplesLeft;
			maxRows = Math.min(maxRows, tuplesLeft);
			String selectStr = "SELECT LIMIT " + m_currentIndex + " " + maxRows + " * FROM " + m_tableName + " ORDER BY "
					+ ReservedColumns.RGMA_TUID_ONE_OFF_COLUMN_NAME;
			java.sql.ResultSet jrs = m_connection.executeQuery(selectStr);
			m_currentIndex += maxRows;
			try {
				TupleSetWithLastTUID rs = convertResultSet(jrs);
				TupleSet ts = rs.getTupleSet();
				ts.setEndOfResults(endOfResults);
				if (LOG.isDebugEnabled()) {
					LOG.debug("Fetched " + ts.size() + " tuples from " + m_tableName);
				}
				return rs;
			} finally {
				if (jrs != null) {
					HSQLDBConnection.closeStatementFromResultSet(jrs);
				}
			}
		}

		/**
		 * Generates a SELECT...INTO statement for this cursor's select and table name.
		 * 
		 * @return String containing SELECT...INTO statement.
		 */
		private String getCreateStatement() {
			StringBuffer buf = new StringBuffer("SELECT ");

			if (m_selectStatement.isDistinct()) {
				buf.append("DISTINCT ");
			}

			boolean first = true;
			for (SelectItem select2 : m_selectStatement.getSelect()) {
				if (!first) {
					buf.append(", ");
				} else {
					first = false;
				}
				buf.append(select2);
			}

			buf.append(" INTO ").append(m_tableName);

			buf.append(" FROM ");

			first = true;
			for (TableReference tableRef : m_selectStatement.getFrom()) {
				if (!first) {
					buf.append(", ");
				} else {
					first = false;
				}
				buf.append(tableRef);
			}

			if (m_selectStatement.getWhere() != null) {
				buf.append(" WHERE " + m_selectStatement.getWhere().toString());
			}

			if (m_selectStatement.getGroupBy() != null) {
				buf.append(" " + m_selectStatement.getGroupBy().toString());
			}

			if (m_selectStatement.getOrderBy() != null) {
				buf.append(" ORDER BY ");
				first = true;
				for (OrderBy orderBy : m_selectStatement.getOrderBy()) {
					if (!first) {
						buf.append(", ");
					} else {
						first = false;
					}
					buf.append(orderBy);
				}

			}

			buf.append(";");
			return buf.toString();
		}

		/**
		 * This is to increase the probablity that the DB is cleaned up
		 */
		protected void finalize() throws RGMAPermanentException {
			close();
		}
	}

	/**
	 * Translates a CreateTableStatement into a HSQLDB compatible String.
	 */
	private static String translateCreateTable(CreateTableStatement createTable) {
		StringBuilder buf = new StringBuilder("CREATE TABLE " + createTable.getTableName() + " (");
		boolean firstCol = true;

		for (ColumnDefinition cd : createTable.getColumns()) {
			if (firstCol) {
				firstCol = false;
			} else {
				buf.append(", ");
			}
			buf.append(cd.getName() + " " + cd.getType());
			if (cd.isNotNull()) {
				buf.append(" NOT NULL");
			}
		}
		buf.append(")");
		return buf.toString();
	}

	/** Connection to HSQLDB database. */
	private HSQLDBConnection m_connection;

	/** Mapping from cursor ID to Cursor objects. */
	private Map<Integer, HSQLDBOneTimeCursor> m_cursors;

	/** Database URL. */
	private String m_dbURL;

	/** PT number to allocate next */
	private int m_ptn;

	/**
	 * Creates a new database instance connecting to the given HSQLDB database. Does not yet establish connection, just
	 * instantiates driver.
	 * 
	 * @param dbName
	 *            Database name.
	 * @throws SQLException
	 * @throws SQLException
	 *             If the connection can't be created.
	 * @throws ConfigurationException
	 */
	public HSQLDBTupleStoreDatabase() throws RGMAPermanentException {
		m_currentCursorID = 0;
		m_dbURL = ServerConfig.getInstance().getProperty(ServerConstants.MEMORY_DATABASE_LOCATION_URL);
		m_cursors = new HashMap<Integer, HSQLDBOneTimeCursor>();
		m_connection = new HSQLDBConnection(m_dbURL, "sa", "");
	}

	/**
	 * Removes the cursor from the list of cursors and closes it.
	 * 
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#closeCursor(int)
	 */
	public void closeCursor(int cursorID) {
		HSQLDBOneTimeCursor cursor = m_cursors.remove(cursorID);
		if (cursor != null) {
			cursor.close();
		} // Do nothing if cursor did not exist.
	}

	public void closeTupleStore(List<String> physicalTableNames, boolean permanent) throws RGMAPermanentException {
		try {
			for (String ptn : physicalTableNames) {
				m_connection.executeUpdate("DROP TABLE " + ptn);
			}
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * @see HSQLDBTupleStoreDatabase#count(String)
	 */
	public int count(String tableName) throws RGMAPermanentException {
		java.sql.ResultSet rs = null;
		try {
			rs = m_connection.executeQuery("SELECT COUNT(*) FROM " + tableName);
			rs.next();
			return rs.getInt(1);
		} catch (SQLException e) {
			if (e.getErrorCode() == -22) {
				LOG.debug("Unable to get count for table " + tableName + " as it no longer exists");
				return 0;
			}
			throw new RGMAPermanentException(e);
		} finally {
			if (rs != null) {
				HSQLDBConnection.closeStatementFromResultSet(rs);
			}
		}
	}

	public void createIndex(CreateIndexStatement icis) throws RGMAPermanentException {
		/*
		 * The index may be too long in which case we need to modify it. First we hope for the best. The limits below
		 * may not be needed for HSQLDB - we will see.
		 */
		int MaxColInIndex = 100;
		int MaxBytesInIndex = 10000;

		/* Make a new CreateIndexStatement but with a globally unique index name */
		CreateIndexStatement cis = new CreateIndexStatement(icis.getTableName() + icis.getIndexName());
		cis.setTableName(icis.getTableName());
		cis.setColumnNames(icis.getColumnNames());

		java.sql.ResultSet rs = null;
		try {
			m_connection.executeUpdate(cis.toString());
			LOG.debug("Executed " + cis);
		} catch (SQLException e) {
			try {
				/*
				 * Make sure it works with maximum size of index - total bytes and column count
				 */
				StringBuffer buf = new StringBuffer("CREATE INDEX ");
				buf.append(cis.getIndexName()).append(" ON ").append(cis.getTableName()).append(" (");
				int colNum = 0;
				int ncolIndex = Math.min(MaxColInIndex, cis.getColumnNames().size());
				int maxStringColumnInIndex = MaxBytesInIndex / ncolIndex;
				for (String colName : cis.getColumnNames()) {
					String type;
					try {
						rs = m_connection.executeQuery("DESCRIBE " + cis.getTableName() + " " + colName);
						rs.first();
						type = rs.getString("Type");
					} catch (Exception e1) {
						throw new RGMAPermanentException(e1);
					}
					buf.append(colName);
					if (type.startsWith("varchar")) {
						String lenString = type.substring(8, type.length() - 1);
						int lenInt = Integer.parseInt(lenString);
						if (maxStringColumnInIndex < lenInt) {
							buf.append("(" + maxStringColumnInIndex + ")");
						}
					}
					if (colNum < ncolIndex - 1) {
						buf.append(", ");
					} else if (colNum == ncolIndex - 1) {
						break;
					}
					colNum++;
				}
				buf.append(')');
				try {
					m_connection.executeUpdate(buf.toString());
					LOG.debug("Executed " + buf);
				} catch (SQLException e1) {
					LOG.error("SQL Exception " + e1.getMessage() + " while creating index " + buf);
				}
			} finally {
				if (rs != null) {
					HSQLDBConnection.closeStatementFromResultSet(rs);
				}
			}
		}
	}

	public synchronized String createTable(String ownerDN, String logicalName, String vdbTableName, String tableType, CreateTableStatement cts)
			throws RGMAPermanentException {
		String physicalTableName = null;
		try {
			physicalTableName = "PT" + m_ptn++;

			CreateTableStatement pcts = new CreateTableStatement(physicalTableName);
			pcts.setColumns(new ArrayList<ColumnDefinition>(cts.getColumns()));
			if (tableType.equals("H")) {
				pcts.getColumns().add(ReservedColumns.RGMA_INSERT_TIME_COLUMN);
				pcts.getColumns().add(ReservedColumns.RGMA_TUID_COLUMN);
			}

			String ctsStr = translateCreateTable(pcts);
			m_connection.executeUpdate(ctsStr);
			if (LOG.isDebugEnabled()) {
				LOG.debug("Physical table '" + physicalTableName + "' created for " + tableType + " '" + vdbTableName + "' " + logicalName + "/[" + ownerDN
						+ ']');
			}
			return physicalTableName;
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * Deletes all tuples older than maxAgeSecs that have not been streamed to all known consumers. Calculates cut-off
	 * time as (currentTimeMillis() - maxAgeSecs * 1000) and executes DELETE statement. The lastReadTUID input is used
	 * to ensure that we do not remove tuples that have not been streamed to all consumers. The lastReadTUID may be -1
	 * if there are no consumers or > 0 to indicate the last tupleID streamed to all known consumers. It will not be
	 * called if has the value 0
	 * 
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#delete(java.lang.String, int)
	 */
	public int deleteByHRP(String tableName, int maxAgeSecs, int lastReadTUID) throws RGMAPermanentException {
		if (lastReadTUID == 0) {
			throw new RGMAPermanentException("deleteByHRP does not accept 0 for lastReadTUID");
		}
		try {
			String timestampString = new Timestamp(System.currentTimeMillis() - maxAgeSecs * 1000L).toString();
			String deleteStr;
			if (lastReadTUID > 0) {
				deleteStr = "DELETE FROM " + tableName + " WHERE " + ReservedColumns.RGMA_INSERT_TIME_COLUMN_NAME + " < '" + timestampString + "' AND "
						+ ReservedColumns.RGMA_TUID_COLUMN_NAME + " <= " + lastReadTUID;
			} else {
				deleteStr = "DELETE FROM " + tableName + " WHERE " + ReservedColumns.RGMA_INSERT_TIME_COLUMN_NAME + " < '" + timestampString + "'";
			}
			return m_connection.executeUpdate(deleteStr);
		} catch (SQLException e) {
			if (e.getErrorCode() == -22) {
				LOG.debug("Unable to delete from table " + tableName + " as it no longer exists");
				return 0;
			}
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * @throws RGMAPermanentException
	 * @see HSQLDBTupleStoreDatabase#deleteByLRP(String)
	 */
	public int deleteByLRP(String tableName) throws RGMAPermanentException {
		try {
			return m_connection.executeUpdate("DELETE FROM " + tableName + " WHERE " + ReservedColumns.RGMA_LRT_COLUMN_NAME + " < NOW()");
		} catch (SQLException e) {
			if (e.getErrorCode() == -22) {
				LOG.debug("Unable to delete from table " + tableName + " as it no longer exists");
				return 0;
			}
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.producer.store.TupleStoreList#remove(java.lang.String)
	 */
	public void dropTupleStore(String ownerDN, String logicalName) throws RGMAPermanentException {
		throw new RGMAPermanentException("dropTupleStore should not be called");
	}

	/**
	 * Retrieves the specified cursor, if it exists, and executes fetch on it.
	 * 
	 * @throws RGMAPermanentException
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#fetch(int, int)
	 */
	public TupleSetWithLastTUID fetch(int cursorID, int maxRows) throws RGMAPermanentException {
		try {
			HSQLDBOneTimeCursor cursor = m_cursors.get(cursorID);
			if (cursor == null) {
				throw new RGMAPermanentException("Cursor " + cursorID + " does not exist.");
			}
			return cursor.fetch(maxRows);
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * @throws RGMAPermanentException
	 * @see HSQLDBTupleStoreDatabase#findFirstTupleID(String, long)
	 */
	public int findFirstTupleID(String physicalTableName, long startTimeMS) throws RGMAPermanentException {
		java.sql.ResultSet rs = null;
		try {
			String timestampString = new Timestamp(startTimeMS).toString();
			String findFirstQuery = "SELECT " + ReservedColumns.RGMA_TUID_COLUMN_NAME + " FROM " + physicalTableName + " WHERE "
					+ ReservedColumns.RGMA_TIMESTAMP_COLUMN_NAME + " >= '" + timestampString + "' ORDER BY " + ReservedColumns.RGMA_TUID_COLUMN_NAME
					+ " LIMIT 1";
			int firstTupleID = 0; // First tuple ID is 0 if there are no tuples
			rs = m_connection.executeQuery(findFirstQuery);
			if (rs.next()) {
				firstTupleID = rs.getInt(1);
				if (LOG.isDebugEnabled()) {
					LOG.debug("Found first tuple ID in database: " + firstTupleID);
				}
			}
			return firstTupleID;
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		} finally {
			if (rs != null) {
				HSQLDBConnection.closeStatementFromResultSet(rs);
			}
		}
	}

	public Map<ResourceEndpoint, Integer> getConsumerTUIDs(String physicalTableName) throws RGMAPermanentException {
		return new HashMap<ResourceEndpoint, Integer>();
	}

	public TupleSetWithLastTUID getContinuous(SelectStatement select, int maxCount) throws RGMAPermanentException {
		String selectStr = select + " LIMIT " + maxCount;
		java.sql.ResultSet jrs = null;
		try {
			jrs = m_connection.executeQuery(selectStr);
			TupleSetWithLastTUID rs = convertResultSet(jrs);
			return rs;
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		} finally {
			if (jrs != null) {
				HSQLDBConnection.closeStatementFromResultSet(jrs);
			}
		}
	}

	public int getMaxTUID(String physicalTableName) throws RGMAPermanentException {
		java.sql.ResultSet rs = null;
		try {
			rs = m_connection.executeQuery("SELECT MAX(RgmaTUID) FROM " + physicalTableName);
			rs.next();
			return rs.getInt(1);
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		} finally {
			if (rs != null) {
				HSQLDBConnection.closeStatementFromResultSet(rs);
			}
		}
	}

	/**
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#insert(java.util.List, java.lang.String,
	 *      java.util.List)
	 */
	public void insert(InsertStatement insertStatement) throws RGMAPermanentException {
		try {
			m_connection.executeUpdate(insertStatement.toString());
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}

	/**
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.producer.store.TupleStoreList#listTupleStores(String)
	 */
	public List<TupleStoreDetails> listTupleStores(String userDN) throws RGMAPermanentException {
		return new ArrayList<TupleStoreDetails>();
	}

	/**
	 * Creates a Cursor for the given SELECT statement and adds it to the list of cursors. Each cursor has its own
	 * connection to HSQLDB.
	 */
	public int openCursor(SelectStatement selectStatement) throws RGMAPermanentException, NumericException {
		int cursorID = nextCursorID();
		HSQLDBOneTimeCursor cursor = new HSQLDBOneTimeCursor(cursorID, selectStatement);
		m_cursors.put(cursorID, cursor);
		return cursorID;
	}

	/**
	 * Executes the SELECT statement on the database and converts the resulting data into an R-GMA ResultSet (including
	 * meta-data).
	 * 
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#select(org.glite.rgma.server.services.sql.SelectStatement)
	 */
	public TupleSetWithLastTUID select(SelectStatement selectStatement) throws RGMAPermanentException {
		String select = selectStatement.toString();
		java.sql.ResultSet jrs = null;
		try {
			jrs = m_connection.executeQuery(select);
			TupleSetWithLastTUID resultSet = convertResultSet(jrs);
			return resultSet;
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		} finally {
			if (jrs != null) {
				HSQLDBConnection.closeStatementFromResultSet(jrs);
			}
		}
	}

	public void shutdown() throws RGMAPermanentException {
		try {
			m_connection.executeUpdate("SHUTDOWN IMMEDIATELY");
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}

	public void storeConsumerTUIDs(String physicalTableName, Map<ResourceEndpoint, Integer> consumerTUID) throws RGMAPermanentException {
		throw new RGMAPermanentException("storeConsumerTUIDs should not be called");
	}

	/**
	 * Executes the given update statement on the database.
	 * 
	 * @throws RGMAPermanentException
	 * @see org.glite.rgma.server.services.database.TupleStoreDatabase#update(org.glite.rgma.server.services.sql.UpdateStatement)
	 */
	public int update(UpdateStatement updateStatement) throws RGMAPermanentException {
		try {
			return m_connection.executeUpdate(updateStatement.toString());
		} catch (SQLException e) {
			throw new RGMAPermanentException(e);
		}
	}
}
